{
 "cells": [
  {
   "cell_type": "raw",
   "metadata": {
    "raw_mimetype": "text/restructuredtext"
   },
   "source": [
    ".. _nb_parallelization:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parallelization"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Most algorithms in our framework are population based. Therefore, the evaluation function receives not only one but multiple solutions at a time. The amount of individuals depends on the algorithm logic and population size.\n",
    "The function targets to set objective and constraint values into a matrix and the evaluation can be done independent of *pymoo*. In the following different implementations of the evaluation function are discussed."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Vectorized"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By default matrix and vector based operations are used to speed up calculations. An example of a vectorized evaluation is already provided in our [Getting Started Guide](../getting_started.ipynb)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import autograd.numpy as anp\n",
    "\n",
    "from pymoo.model.problem import Problem\n",
    "\n",
    "class MyProblem(Problem):\n",
    "\n",
    "    def __init__(self, **kwargs):\n",
    "        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)\n",
    "\n",
    "    def _evaluate(self, x, out, *args, **kwargs):\n",
    "         out[\"F\"] = anp.sum(x ** 2, axis=1) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By default no parallelization is applied and only the code is executed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "problem = MyProblem(parallelization = None)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create some sample input:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "X = np.random.random((10000, problem.n_var))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "223 µs ± 13 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
     ]
    }
   ],
   "source": [
    "%timeit F = problem.evaluate(X)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Multiple Threads \n",
    "\n",
    "Another way of defining a problem is by using the `elementwise_evaluation` and then execute each evaluation is a different thread. This can be automatically achieved by the following code:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "class MyProblem(Problem):\n",
    "\n",
    "    def __init__(self, **kwargs):\n",
    "        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, \n",
    "                         elementwise_evaluation=True, **kwargs)\n",
    "\n",
    "    def _evaluate(self, x, out, *args, **kwargs):\n",
    "         out[\"F\"] = (x ** 2).sum()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "problem = MyProblem(parallelization = (\"threads\", 4))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "147 ms ± 2.48 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
     ]
    }
   ],
   "source": [
    "%timeit F = problem.evaluate(X)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Please note, that for small examples as shown here vectorization will always be faster. However, problem evaluation functions which are based on simulations can often not be easily vectorized because of their independent behavior."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**NOTE:** If your are a Windows user your function call of the minimize method needs to happen in a main method. The reason is explained [here](https://stackoverflow.com/questions/24374288/where-to-put-freeze-support-in-a-python-script) and an example can be found [here](https://github.com/msu-coinlab/pymoo/issues/23). Otherwise, you will get an error message saying to call freeze_support() somewhere in your code."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generic Starmap Interface\n",
    "\n",
    "Another way of parallelized evaluation is to provide an external function that provides the \"starmap\" interface,\n",
    "as in the Python standard library `multiprocessing.Pool.starmap` [function](https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#multiprocessing.pool.Pool.starmap).\n",
    "\n",
    "**NOTE:** For this example the problem is supposed to be of type `elementwise_evaluation=True`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    },
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "import multiprocessing\n",
    "\n",
    "pool = multiprocessing.Pool()\n",
    "problem = MyProblem(parallelization = ('starmap', pool.starmap))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "94.3 ms ± 15.2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
     ]
    }
   ],
   "source": [
    "%timeit problem.evaluate(X)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "pool.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Dask"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "More advanced is to distribute the evaluation function to a couple of workers. There exists a couple of framework that support the distribution of code. For our framework, we recommend using [Dask](https://dask.org).\n",
    "\n",
    "A documentation to setup the cluster is available [here](https://docs.dask.org/en/latest/setup/cli.html). Basically, you first start a scheduler somewhere and then connect workers to it. Then, a client object connects to the scheduler and distributes the jobs automatically for you."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client\n",
    "client = Client()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "from pymoo.model.problem import Problem\n",
    "\n",
    "# define the evaluation function that directly returns the objective and/or\n",
    "# constraint values. Please note it must be elementwise_evaluation.\n",
    "def fun(x):\n",
    "    return {\n",
    "        \"F\": np.sum(x ** 2)\n",
    "    }\n",
    "\n",
    "# define a problem without any evaluation function - everything is done by parallelization\n",
    "class MyProblem(Problem):\n",
    "\n",
    "    def __init__(self, *args, **kwargs):\n",
    "        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, \n",
    "                         elementwise_evaluation=True, *args, **kwargs)\n",
    "\n",
    "# create the problem and set the parallelization to dask\n",
    "problem = MyProblem(parallelization=(\"dask\", client, fun))\n",
    "\n",
    "X = np.random.random((10, problem.n_var))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "37.4 ms ± 7.88 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
     ]
    }
   ],
   "source": [
    "%timeit problem.evaluate(X)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Manually use Dask**\n",
    "\n",
    "In order to have full control over the distribution you can use Dask manually in the _evaluate function. By using the *parallelization* this is done internally, however, it might be useful to have full control over it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "class MyProblem(Problem):\n",
    "\n",
    "    def __init__(self, *args, **kwargs):\n",
    "        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, \n",
    "                         elementwise_evaluation=True, *args, **kwargs)\n",
    "\n",
    "    def _evaluate(self, X, out, *args, **kwargs):\n",
    "        \n",
    "        def fun(x):\n",
    "            return np.sum(x ** 2) \n",
    "        \n",
    "        jobs = [client.submit(fun, x) for x in X]\n",
    "        out[\"F\"] = np.row_stack([job.result() for job in jobs])\n",
    "        \n",
    "\n",
    "problem = MyProblem()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "485 ms ± 45.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
     ]
    }
   ],
   "source": [
    "%timeit problem.evaluate(X)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Do not forget to close the client if it is running locally!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.close()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
